#ifndef COMMON_EXAMPLE_KAFKA_H
#define COMMON_EXAMPLE_KAFKA_H
#include <librdkafka/rdkafka.h>
#include <string>
#include <vector>
#include <functional>
#include <atomic>

namespace KafkaClient {

	class kafka_producer
	{
	public:
		explicit kafka_producer(
				const std::string & brokers,
				const std::string & topic);
		virtual ~kafka_producer();
	public:
		bool write(
				const char * data,
				const int len,
				const int maxTry = 10,
				const char * key = nullptr,
				const int keylen = 0);
	protected:
		bool init();
		bool exit();
	protected:
		rd_kafka_t *rk = nullptr;       /* Producer instance handle */
		std::string m_topic;
		std::string m_brokers;

	};

	class kafka_consumer{
	public:
		explicit kafka_consumer(
				const std::string & brokers,
				const std::vector<std::string> topics,
				const std::string & group);
		virtual ~kafka_consumer();
	protected:
		bool init();
		bool exit();
	public:
		bool stop();
		bool run(std::function<void (rd_kafka_message_t *)> cb );
	protected:
		rd_kafka_t *rk = nullptr;       /* Producer instance handle */
		std::atomic<bool> m_stop;
		std::vector<std::string> m_topics;
		std::string m_brokers;
		std::string m_group;
	};

	//Default printf Callback
	extern void (*cbprintf) (const char *,...);
	void csprintf (const char * format,...);
}

#endif // COMMON_EXAMPLE_KAFKA_H
